package com.xingfei.blog.util;

import com.xingfei.blog.dto.SecretMessageDTO;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 手动创建/销毁topic模式的consumer
 * Created by yhang on 2017/5/8.
 */
public class TopicConsumerUtil {

    private long userId;

    private String brokerurl;

    private String user;

    private String password;

    public TopicConsumerUtil(long userId,String brokerurl,String user,String password){
        this.userId = userId;
        this.brokerurl = brokerurl;
        this.user = user;
        this.password = password;
    }

    private Connection connection;

    private MessageConsumer consumer;

    // topic模式

    public MessageConsumer initConsumer() {

        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(user,password,brokerurl);
        activeMQConnectionFactory.setTrustAllPackages(true);
        try {
            activeMQConnectionFactory.getClientIDPrefix();
                connection = activeMQConnectionFactory.createConnection();
                connection.setClientID(userId+""); //持久订阅需要设置这个。
                connection.start();
                // 创建session
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

                // 创建destination
                Topic topic = session.createTopic("sample.topic"); //Topic名称

                consumer = session.createDurableSubscriber(topic,userId+""); //持久订阅

                consumer.setMessageListener(new MessageListener(){
                    @Override
                    public void onMessage(Message msg) {
                        SecretMessageDTO messageDTO = null;
                        try {
                            if (msg instanceof TextMessage) {
                                TextMessage txtMsg = (TextMessage) msg;
                                String message = txtMsg.getText();
                                System.out.println("何时触发呢？？");
                            } else if(msg instanceof ObjectMessage){
                                messageDTO = (SecretMessageDTO) ((ObjectMessage) msg).getObject();
                                System.out.println(messageDTO.getMessageContent());
                            }else {
                                System.out.println("无法处理");
                            }
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                });
        } catch (InvalidClientIDException e){
            System.out.println(e.getMessage());
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        return consumer;
    }

    /**
     * 关闭连接
     */
    public void closeConsumer() {
        try {
            if(connection!=null){
                connection.stop();
            }
            if(consumer!=null){
                consumer.close();
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
